package com.alibaba.datax.core.statistics.container.communicator.taskgroup;

import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.statistics.communication.CommunicationTool;
import com.alibaba.datax.core.statistics.container.collector.AbstractCollector;
import com.alibaba.datax.core.statistics.container.collector.ProcessInnerCollector;
import com.alibaba.datax.core.statistics.container.report.AbstractReporter;
import com.alibaba.datax.core.statistics.container.report.ProcessInnerReporter;
import com.alibaba.datax.core.statistics.communication.Communication;
import com.alibaba.datax.core.util.container.CoreConstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class StandaloneTGContainerCommunicator extends AbstractTGContainerCommunicator {

    private static final Logger LOG = LoggerFactory
            .getLogger(StandaloneTGContainerCommunicator.class);

    public StandaloneTGContainerCommunicator(Configuration configuration,
                                             Map<Integer, Communication> taskGroupCommunicationMap) {
        super(configuration);
        AbstractCollector collector = new ProcessInnerCollector(taskGroupCommunicationMap);
        setCollector(collector);
        AbstractReporter reporter = new ProcessInnerReporter(collector.getTaskCommunicationMap(), taskGroupCommunicationMap);
        setReporter(reporter);
    }

    @Override
    public void report(Communication communication) {
        super.getReporter().reportTGCommunication(super.taskGroupId, communication);
        // todo
        if ("taskGroup".equalsIgnoreCase(configuration
                .getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL))) {
            String jobName = configuration.getString("job.name");
            LOG.info(jobName + ": " + CommunicationTool.Stringify.getSnapshot(communication));
            reportVmInfo();
        }
    }

}
